use super::execution_unit::QueryHash;
use super::module_subscription_manager::{
    from_tx_offset, spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats,
    SubscriptionManager, TransactionOffset,
};
use super::query::compile_query_with_hashes;
use super::tx::DeltaTx;
use super::{collect_table_update, TableUpdateType};
use crate::client::messages::{
    ProcedureResultMessage, SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage,
    SubscriptionResult, SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage,
};
use crate::client::{ClientActorId, ClientConnectionSender, Protocol};
use crate::db::relational_db::{MutTx, RelationalDB, Tx};
use crate::error::DBError;
use crate::estimation::estimate_rows_scanned;
use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent};
use crate::host::ModuleHost;
use crate::messages::websocket::Subscribe;
use crate::subscription::query::is_subscribe_to_all_tables;
use crate::subscription::{collect_table_update_for_view, execute_plans};
use crate::util::prometheus_handle::IntGaugeExt;
use crate::vm::check_row_limit;
use crate::worker_metrics::WORKER_METRICS;
use parking_lot::RwLock;
use prometheus::{Histogram, HistogramTimer, IntCounter, IntGauge};
use scopeguard::ScopeGuard;
use spacetimedb_client_api_messages::websocket::{
    self as ws, BsatnFormat, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe,
    UnsubscribeMulti,
};
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
use spacetimedb_datastore::traits::{IsolationLevel, TxData};
use spacetimedb_durability::TxOffset;
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
use spacetimedb_expr::expr::CollectViews;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::Identity;
use spacetimedb_primitives::ArgId;
use std::collections::HashSet;
use std::{sync::Arc, time::Instant};
use tokio::sync::oneshot;

type Subscriptions = Arc<RwLock<SubscriptionManager>>;

#[derive(Clone)]
pub struct ModuleSubscriptions {
    relational_db: Arc<RelationalDB>,
    /// If taking a lock (tx) on the db at the same time, ALWAYS lock the db first.
    /// You will deadlock otherwise.
    subscriptions: Subscriptions,
    broadcast_queue: BroadcastQueue,
    stats: Arc<SubscriptionGauges>,
}

#[derive(Debug, Clone)]
pub struct SubscriptionGauges {
    db_identity: Identity,
    num_queries: IntGauge,
    num_connections: IntGauge,
    num_subscription_sets: IntGauge,
    num_query_subscriptions: IntGauge,
    num_legacy_subscriptions: IntGauge,
}

impl SubscriptionGauges {
    fn new(db_identity: &Identity) -> Self {
        let num_queries = WORKER_METRICS.subscription_queries.with_label_values(db_identity);
        let num_connections = DB_METRICS.subscription_connections.with_label_values(db_identity);
        let num_subscription_sets = DB_METRICS.subscription_sets.with_label_values(db_identity);
        let num_query_subscriptions = DB_METRICS.total_query_subscriptions.with_label_values(db_identity);
        let num_legacy_subscriptions = DB_METRICS.num_legacy_subscriptions.with_label_values(db_identity);
        Self {
            db_identity: *db_identity,
            num_queries,
            num_connections,
            num_subscription_sets,
            num_query_subscriptions,
            num_legacy_subscriptions,
        }
    }

    // Clear the subscription gauges for this database.
    fn unregister(&self) {
        let _ = WORKER_METRICS
            .subscription_queries
            .remove_label_values(&self.db_identity);
        let _ = DB_METRICS
            .subscription_connections
            .remove_label_values(&self.db_identity);
        let _ = DB_METRICS.subscription_sets.remove_label_values(&self.db_identity);
        let _ = DB_METRICS
            .total_query_subscriptions
            .remove_label_values(&self.db_identity);
        let _ = DB_METRICS
            .num_legacy_subscriptions
            .remove_label_values(&self.db_identity);
    }

    fn report(&self, stats: &SubscriptionGaugeStats) {
        self.num_queries.set(stats.num_queries as i64);
        self.num_connections.set(stats.num_connections as i64);
        self.num_subscription_sets.set(stats.num_subscription_sets as i64);
        self.num_query_subscriptions.set(stats.num_query_subscriptions as i64);
        self.num_legacy_subscriptions.set(stats.num_legacy_subscriptions as i64);
    }
}

pub struct SubscriptionMetrics {
    pub lock_waiters: IntGauge,
    pub lock_wait_time: Histogram,
    pub compilation_time: Histogram,
    pub num_queries_subscribed: IntCounter,
    pub num_new_queries_subscribed: IntCounter,
    pub num_queries_evaluated: IntCounter,
}

impl SubscriptionMetrics {
    pub fn new(db: &Identity, workload: &WorkloadType) -> Self {
        Self {
            lock_waiters: DB_METRICS.subscription_lock_waiters.with_label_values(db, workload),
            lock_wait_time: DB_METRICS.subscription_lock_wait_time.with_label_values(db, workload),
            compilation_time: DB_METRICS.subscription_compile_time.with_label_values(db, workload),
            num_queries_subscribed: DB_METRICS.num_queries_subscribed.with_label_values(db),
            num_new_queries_subscribed: DB_METRICS.num_new_queries_subscribed.with_label_values(db),
            num_queries_evaluated: DB_METRICS.num_queries_evaluated.with_label_values(db, workload),
        }
    }
}

/// Inner result type of [`ModuleSubscriptions::commit_and_broadcast_event`].
pub type CommitAndBroadcastEventResult = Result<CommitAndBroadcastEventSuccess, WriteConflict>;

/// `Ok` side of a [`CommitAndBroadcastEventResult`].
pub struct CommitAndBroadcastEventSuccess {
    pub tx_offset: TransactionOffset,
    pub event: Arc<ModuleEvent>,
    pub metrics: ExecutionMetrics,
}

/// Commits `tx`
/// and evaluates and broadcasts subscriptions updates.
pub(crate) fn commit_and_broadcast_event(
    subs: &ModuleSubscriptions,
    client: Option<Arc<ClientConnectionSender>>,
    event: ModuleEvent,
    tx: MutTxId,
) -> CommitAndBroadcastEventSuccess {
    match subs.commit_and_broadcast_event(client, event, tx).unwrap() {
        Ok(res) => res,
        Err(WriteConflict) => todo!("Write skew, you need to implement retries my man, T-dawg."),
    }
}

type AssertTxFn = Arc<dyn Fn(&Tx) + Send + Sync + 'static>;
type SubscriptionUpdate = FormatSwitch<TableUpdate<BsatnFormat>, TableUpdate<JsonFormat>>;
type FullSubscriptionUpdate = FormatSwitch<ws::DatabaseUpdate<BsatnFormat>, ws::DatabaseUpdate<JsonFormat>>;

/// A utility for sending an error message to a client and returning early
macro_rules! return_on_err {
    ($expr:expr, $handler:expr, $metrics:expr) => {
        match $expr {
            Ok(val) => val,
            Err(e) => {
                // TODO: Handle errors sending messages.
                let _ = $handler(e.to_string().into());
                return Ok($metrics);
            }
        }
    };
}

/// A utility for sending an error message to a client and returning early
macro_rules! return_on_err_with_sql {
    ($expr:expr, $sql:expr, $handler:expr) => {
        match $expr.map_err(|err| DBError::WithSql {
            sql: $sql.into(),
            error: Box::new(DBError::Other(err.into())),
        }) {
            Ok(val) => val,
            Err(e) => {
                // TODO: Handle errors sending messages.
                let _ = $handler(e.to_string().into());
                return Ok(None);
            }
        }
    };
}

impl ModuleSubscriptions {
    pub fn new(
        relational_db: Arc<RelationalDB>,
        subscriptions: Subscriptions,
        broadcast_queue: BroadcastQueue,
    ) -> Self {
        let db = &relational_db.database_identity();
        let stats = Arc::new(SubscriptionGauges::new(db));

        Self {
            relational_db,
            subscriptions,
            broadcast_queue,
            stats,
        }
    }

    /// Construct a new [`ModuleSubscriptions`] for use in testing,
    /// creating a new [`tokio::runtime::Runtime`] to run its send worker.
    pub fn for_test_new_runtime(db: Arc<RelationalDB>) -> (ModuleSubscriptions, tokio::runtime::Runtime) {
        let runtime = tokio::runtime::Runtime::new().unwrap();
        let _rt = runtime.enter();
        (Self::for_test_enclosing_runtime(db), runtime)
    }

    /// Construct a new [`ModuleSubscriptions`] for use in testing,
    /// running its send worker on the dynamically enclosing [`tokio::runtime::Runtime`]
    pub fn for_test_enclosing_runtime(db: Arc<RelationalDB>) -> ModuleSubscriptions {
        let send_worker_queue = spawn_send_worker(None);
        ModuleSubscriptions::new(
            db,
            SubscriptionManager::for_test_without_metrics_arc_rwlock(),
            send_worker_queue,
        )
    }

    /// Returns the [`RelationalDB`] of this [`ModuleSubscriptions`].
    ///
    /// This is used by [`ModuleInfo`] and in turn by `InstanceCommon`.
    pub fn relational_db(&self) -> &Arc<RelationalDB> {
        &self.relational_db
    }

    // Recompute gauges to update metrics.
    pub fn update_gauges(&self) {
        let num_queries = self.subscriptions.read().calculate_gauge_stats();
        self.stats.report(&num_queries);
    }

    // Remove the subscription gauges for this database.
    // TODO: This should be called when the database is shut down.
    pub fn remove_gauges(&self) {
        self.stats.unregister();
    }

    /// Run auth and row limit checks for a new subscriber, then compute the initial query results.
    fn evaluate_initial_subscription(
        &self,
        sender: Arc<ClientConnectionSender>,
        query: Arc<Plan>,
        tx: &TxId,
        auth: &AuthCtx,
        update_type: TableUpdateType,
    ) -> Result<(SubscriptionUpdate, ExecutionMetrics), DBError> {
        check_row_limit(
            &[&query],
            &self.relational_db,
            tx,
            |plan, tx| {
                plan.plans_fragments()
                    .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
                    .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
            },
            auth,
        )?;

        let table_id = query.subscribed_table_id();
        let table_name = query.subscribed_table_name();

        let plans = query
            .plans_fragments()
            .map(|fragment| fragment.optimized_physical_plan())
            .cloned()
            .map(|plan| plan.optimize(auth))
            .collect::<Result<Vec<_>, _>>()?;

        let view_info = plans
            .first()
            .and_then(|plan| plan.return_table())
            .and_then(|schema| schema.view_info);

        let num_cols = plans
            .first()
            .and_then(|plan| plan.return_table())
            .map(|schema| schema.num_cols())
            .unwrap_or_default();

        let tx = DeltaTx::from(tx);

        // TODO: See the comment on `collect_table_update_for_view`.
        // The following view and non-view branches should be merged together,
        // since the only difference between them is the row type that is returned.
        Ok(match (sender.config.protocol, view_info) {
            (Protocol::Binary, Some(view_info)) => {
                let plans = plans
                    .into_iter()
                    .map(PipelinedProject::from)
                    .map(|plan| ViewProject::new(plan, num_cols, view_info.num_private_cols()))
                    .collect::<Vec<_>>();
                collect_table_update_for_view(&plans, table_id, table_name.into(), &tx, update_type)
                    .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))
            }
            (Protocol::Binary, None) => {
                let plans = plans.into_iter().map(PipelinedProject::from).collect::<Vec<_>>();
                collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
                    .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))
            }
            (Protocol::Text, Some(view_info)) => {
                let plans = plans
                    .into_iter()
                    .map(PipelinedProject::from)
                    .map(|plan| ViewProject::new(plan, num_cols, view_info.num_private_cols()))
                    .collect::<Vec<_>>();
                collect_table_update_for_view(&plans, table_id, table_name.into(), &tx, update_type)
                    .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))
            }
            (Protocol::Text, None) => {
                let plans = plans.into_iter().map(PipelinedProject::from).collect::<Vec<_>>();
                collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
                    .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))
            }
        }?)
    }

    fn evaluate_queries(
        &self,
        sender: Arc<ClientConnectionSender>,
        queries: &[Arc<Plan>],
        tx: &TxId,
        auth: &AuthCtx,
        update_type: TableUpdateType,
    ) -> Result<(FullSubscriptionUpdate, ExecutionMetrics), DBError> {
        check_row_limit(
            queries,
            &self.relational_db,
            tx,
            |plan, tx| {
                plan.plans_fragments()
                    .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
                    .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
            },
            auth,
        )?;

        let tx = DeltaTx::from(tx);
        match sender.config.protocol {
            Protocol::Binary => {
                let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?;
                Ok((FormatSwitch::Bsatn(update), metrics))
            }
            Protocol::Text => {
                let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?;
                Ok((FormatSwitch::Json(update), metrics))
            }
        }
    }

    /// Add a subscription to a single query.
    #[tracing::instrument(level = "trace", skip_all)]
    pub async fn add_single_subscription(
        &self,
        host: Option<&ModuleHost>,
        sender: Arc<ClientConnectionSender>,
        auth: AuthCtx,
        request: SubscribeSingle,
        timer: Instant,
        _assert: Option<AssertTxFn>,
    ) -> Result<Option<ExecutionMetrics>, DBError> {
        // Send an error message to the client
        let send_err_msg = |message| {
            self.broadcast_queue.send_client_message(
                sender.clone(),
                None,
                SubscriptionMessage {
                    request_id: Some(request.request_id),
                    query_id: Some(request.query_id),
                    timer: Some(timer),
                    result: SubscriptionResult::Error(SubscriptionError {
                        table_id: None,
                        message,
                    }),
                },
            )
        };

        let sql = request.query;
        let hash = QueryHash::from_string(&sql, auth.caller(), false);
        let hash_with_param = QueryHash::from_string(&sql, auth.caller(), true);

        let (mut_tx, _) = self.begin_mut_tx(Workload::Subscribe);

        let existing_query = {
            let guard = self.subscriptions.read();
            guard.query(&hash)
        };

        let query = return_on_err_with_sql!(
            existing_query.map(Ok).unwrap_or_else(|| compile_query_with_hashes(
                &auth,
                &*mut_tx,
                &sql,
                hash,
                hash_with_param
            )
            .map(Arc::new)),
            sql,
            send_err_msg
        );

        let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
        let (tx, tx_offset) = self
            .materialize_views_and_downgrade_tx(host, mut_tx, &query, auth.caller())
            .await?;

        let (table_rows, metrics) = return_on_err_with_sql!(
            self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Subscribe),
            query.sql(),
            send_err_msg
        );

        // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
        // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
        // but that should not pose an issue.
        let mut subscriptions = self.subscriptions.write();
        subscriptions.add_subscription(sender.clone(), query.clone(), request.query_id)?;

        #[cfg(test)]
        if let Some(assert) = _assert {
            assert(&tx);
        }

        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
        // queue while we are still holding a read-lock on the database.

        // That will avoid race conditions because reducers first grab a write lock on the db, then
        // grab a read lock on the subscriptions.

        // Holding a write lock on `self.subscriptions` would also be sufficient.
        let _ = self.broadcast_queue.send_client_message(
            sender.clone(),
            Some(tx_offset),
            SubscriptionMessage {
                request_id: Some(request.request_id),
                query_id: Some(request.query_id),
                timer: Some(timer),
                result: SubscriptionResult::Subscribe(SubscriptionRows {
                    table_id: query.subscribed_table_id(),
                    table_name: query.subscribed_table_name().into(),
                    table_rows,
                }),
            },
        );
        Ok(Some(metrics))
    }

    /// Remove a subscription for a single query.
    pub fn remove_single_subscription(
        &self,
        sender: Arc<ClientConnectionSender>,
        auth: AuthCtx,
        request: Unsubscribe,
        timer: Instant,
    ) -> Result<Option<ExecutionMetrics>, DBError> {
        // Send an error message to the client
        let send_err_msg = |message| {
            self.broadcast_queue.send_client_message(
                sender.clone(),
                None,
                SubscriptionMessage {
                    request_id: Some(request.request_id),
                    query_id: Some(request.query_id),
                    timer: Some(timer),
                    result: SubscriptionResult::Error(SubscriptionError {
                        table_id: None,
                        message,
                    }),
                },
            )
        };

        let mut subscriptions = self.subscriptions.write();

        let queries = return_on_err!(
            subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id),
            // Apparently we ignore errors sending messages.
            send_err_msg,
            None
        );
        // This is technically a bug, since this could be empty if the client has another duplicate subscription.
        // This whole function should be removed soon, so I don't think we need to fix it.
        let [query] = &*queries else {
            // Apparently we ignore errors sending messages.
            let _ = send_err_msg("Internal error".into());
            return Ok(None);
        };

        let (tx, tx_offset) = self.unsubscribe_views(query, auth.caller())?;

        let (table_rows, metrics) = return_on_err_with_sql!(
            self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe),
            query.sql(),
            send_err_msg
        );

        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
        // queue while we are still holding a read-lock on the database.

        // That will avoid race conditions because reducers first grab a write lock on the db, then
        // grab a read lock on the subscriptions.

        // Holding a write lock on `self.subscriptions` would also be sufficient.
        let _ = self.broadcast_queue.send_client_message(
            sender.clone(),
            Some(tx_offset),
            SubscriptionMessage {
                request_id: Some(request.request_id),
                query_id: Some(request.query_id),
                timer: Some(timer),
                result: SubscriptionResult::Unsubscribe(SubscriptionRows {
                    table_id: query.subscribed_table_id(),
                    table_name: query.subscribed_table_name().into(),
                    table_rows,
                }),
            },
        );
        Ok(Some(metrics))
    }

    /// Remove a client's subscription for a set of queries.
    #[tracing::instrument(level = "trace", skip_all)]
    pub fn remove_multi_subscription(
        &self,
        sender: Arc<ClientConnectionSender>,
        auth: AuthCtx,
        request: UnsubscribeMulti,
        timer: Instant,
    ) -> Result<Option<ExecutionMetrics>, DBError> {
        // Send an error message to the client
        let send_err_msg = |message| {
            self.broadcast_queue.send_client_message(
                sender.clone(),
                None,
                SubscriptionMessage {
                    request_id: Some(request.request_id),
                    query_id: Some(request.query_id),
                    timer: Some(timer),
                    result: SubscriptionResult::Error(SubscriptionError {
                        table_id: None,
                        message,
                    }),
                },
            )
        };

        let database_identity = self.relational_db.database_identity();
        let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Unsubscribe);

        // Always lock the db before the subscription lock to avoid deadlocks.
        let (mut_tx, _) = self.begin_mut_tx(Workload::Unsubscribe);

        let removed_queries = {
            let _compile_timer = subscription_metrics.compilation_time.start_timer();
            let mut subscriptions = {
                // How contended is the lock?
                let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
                let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
                self.subscriptions.write()
            };

            return_on_err!(
                subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id),
                send_err_msg,
                None
            )
        };

        let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
        let (tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?;

        let (update, metrics) = return_on_err!(
            self.evaluate_queries(
                sender.clone(),
                &removed_queries,
                &tx,
                &auth,
                TableUpdateType::Unsubscribe,
            ),
            send_err_msg,
            None
        );

        // How many queries did we evaluate?
        subscription_metrics
            .num_queries_evaluated
            .inc_by(removed_queries.len() as _);

        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
        // queue while we are still holding a read-lock on the database.

        // That will avoid race conditions because reducers first grab a write lock on the db, then
        // grab a read lock on the subscriptions.

        // Holding a write lock on `self.subscriptions` would also be sufficient.
        let _ = self.broadcast_queue.send_client_message(
            sender,
            Some(tx_offset),
            SubscriptionMessage {
                request_id: Some(request.request_id),
                query_id: Some(request.query_id),
                timer: Some(timer),
                result: SubscriptionResult::UnsubscribeMulti(SubscriptionData { data: update }),
            },
        );

        Ok(Some(metrics))
    }

    /// Compiles the queries in a [Subscribe] or [SubscribeMulti] message.
    ///
    /// Note, we hash queries to avoid recompilation,
    /// but we need to know if a query is parameterized in order to hash it correctly.
    /// This requires that we type check which in turn requires that we start a tx.
    ///
    /// Unfortunately parsing with sqlparser is quite expensive,
    /// so we'd like to avoid that cost while holding the tx lock,
    /// especially since all we're trying to do is generate a hash.
    ///
    /// Instead we generate two hashes and outside of the tx lock.
    /// If either one is currently tracked, we can avoid recompilation.
    #[allow(clippy::type_complexity)]
    fn compile_queries(
        &self,
        sender: Identity,
        auth: AuthCtx,
        queries: &[Box<str>],
        num_queries: usize,
        metrics: &SubscriptionMetrics,
    ) -> Result<(Vec<Arc<Plan>>, AuthCtx, MutTxId, HistogramTimer), DBError> {
        let mut subscribe_to_all_tables = false;
        let mut plans = Vec::with_capacity(num_queries);
        let mut query_hashes = Vec::with_capacity(num_queries);

        for sql in queries {
            let sql = sql.trim();
            if is_subscribe_to_all_tables(sql) {
                subscribe_to_all_tables = true;
                continue;
            }
            let hash = QueryHash::from_string(sql, sender, false);
            let hash_with_param = QueryHash::from_string(sql, sender, true);
            query_hashes.push((sql, hash, hash_with_param));
        }

        // We always get the db lock before the subscription lock to avoid deadlocks.
        let (mut_tx, _tx_offset) = self.begin_mut_tx(Workload::Subscribe);

        let compile_timer = metrics.compilation_time.start_timer();

        let guard = {
            // How contended is the lock?
            let _wait_guard = metrics.lock_waiters.inc_scope();
            let _wait_timer = metrics.lock_wait_time.start_timer();
            self.subscriptions.read()
        };

        if subscribe_to_all_tables {
            plans.extend(
                super::subscription::get_all(
                    |relational_db, tx| relational_db.get_all_tables_mut(tx).map(|schemas| schemas.into_iter()),
                    &self.relational_db,
                    &*mut_tx,
                    &auth,
                )?
                .into_iter()
                .map(Arc::new),
            );
        }

        let mut new_queries = 0;

        for (sql, hash, hash_with_param) in query_hashes {
            if let Some(unit) = guard.query(&hash) {
                plans.push(unit);
            } else if let Some(unit) = guard.query(&hash_with_param) {
                plans.push(unit);
            } else {
                plans.push(Arc::new(
                    compile_query_with_hashes(&auth, &*mut_tx, sql, hash, hash_with_param).map_err(|err| {
                        DBError::WithSql {
                            error: Box::new(DBError::Other(err.into())),
                            sql: sql.into(),
                        }
                    })?,
                ));
                new_queries += 1;
            }
        }

        // How many queries in this subscription are not cached?
        metrics.num_new_queries_subscribed.inc_by(new_queries);

        Ok((plans, auth, ScopeGuard::<MutTxId, _>::into_inner(mut_tx), compile_timer))
    }

    /// Send a message to a client connection.
    /// This will eventually be sent by the send-worker.
    /// This takes a `TxId`, because this should be called while still holding a lock on the database.
    pub fn send_client_message(
        &self,
        recipient: Arc<ClientConnectionSender>,
        message: impl Into<SerializableMessage>,
        (_tx, tx_offset): (&TxId, TransactionOffset),
    ) -> Result<(), BroadcastError> {
        self.broadcast_queue
            .send_client_message(recipient, Some(tx_offset), message)
    }

    /// Like [`Self::send_client_message`],
    /// but doesn't require a `TxId` because procedures don't hold a transaction open.
    pub fn send_procedure_message(
        &self,
        recipient: Arc<ClientConnectionSender>,
        message: ProcedureResultMessage,
        tx_offset: Option<TransactionOffset>,
    ) -> Result<(), BroadcastError> {
        self.broadcast_queue.send_client_message(recipient, tx_offset, message)
    }

    #[tracing::instrument(level = "trace", skip_all)]
    pub async fn add_multi_subscription(
        &self,
        host: Option<&ModuleHost>,
        sender: Arc<ClientConnectionSender>,
        auth: AuthCtx,
        request: SubscribeMulti,
        timer: Instant,
        _assert: Option<AssertTxFn>,
    ) -> Result<Option<ExecutionMetrics>, DBError> {
        // Send an error message to the client
        let send_err_msg = |message| {
            let _ = self.broadcast_queue.send_client_message(
                sender.clone(),
                None,
                SubscriptionMessage {
                    request_id: Some(request.request_id),
                    query_id: Some(request.query_id),
                    timer: Some(timer),
                    result: SubscriptionResult::Error(SubscriptionError {
                        table_id: None,
                        message,
                    }),
                },
            );
        };

        let num_queries = request.query_strings.len();

        let database_identity = self.relational_db.database_identity();
        let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Subscribe);

        // How many queries make up this subscription?
        subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);

        let (queries, auth, mut_tx, compile_timer) = return_on_err!(
            self.compile_queries(
                sender.id.identity,
                auth,
                &request.query_strings,
                num_queries,
                &subscription_metrics
            ),
            send_err_msg,
            None
        );
        let (mut_tx, _) = self.guard_mut_tx(mut_tx, <_>::default());

        // We minimize locking so that other clients can add subscriptions concurrently.
        // We are protected from race conditions with broadcasts, because we have the db lock,
        // an `commit_and_broadcast_event` grabs a read lock on `subscriptions` while it still has a
        // write lock on the db.
        let queries = {
            let mut subscriptions = {
                // How contended is the lock?
                let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
                let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
                self.subscriptions.write()
            };

            subscriptions.add_subscription_multi(sender.clone(), queries, request.query_id)?
        };

        // Record how long it took to compile the subscription
        drop(compile_timer);

        let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
        let (tx, tx_offset) = self
            .materialize_views_and_downgrade_tx(host, mut_tx, &queries, auth.caller())
            .await?;

        let Ok((update, metrics)) =
            self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe)
        else {
            // If we fail the query, we need to remove the subscription.
            let mut subscriptions = {
                // How contended is the lock?
                let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
                let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
                self.subscriptions.write()
            };
            {
                let _compile_timer = subscription_metrics.compilation_time.start_timer();
                subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id)?;
            }

            send_err_msg("Internal error evaluating queries".into());
            return Ok(None);
        };

        // How many queries did we actually evaluate?
        subscription_metrics.num_queries_evaluated.inc_by(queries.len() as _);

        #[cfg(test)]
        if let Some(assert) = _assert {
            assert(&tx);
        }

        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
        // queue while we are still holding a read-lock on the database.

        // That will avoid race conditions because reducers first grab a write lock on the db, then
        // grab a read lock on the subscriptions.

        // Holding a write lock on `self.subscriptions` would also be sufficient.

        let _ = self.broadcast_queue.send_client_message(
            sender.clone(),
            Some(tx_offset),
            SubscriptionMessage {
                request_id: Some(request.request_id),
                query_id: Some(request.query_id),
                timer: Some(timer),
                result: SubscriptionResult::SubscribeMulti(SubscriptionData { data: update }),
            },
        );

        Ok(Some(metrics))
    }

    /// Add a subscriber to the module. NOTE: this function is blocking.
    /// This is used for the legacy subscription API which uses a set of queries.
    #[tracing::instrument(level = "trace", skip_all)]
    pub async fn add_legacy_subscriber(
        &self,
        host: Option<&ModuleHost>,
        sender: Arc<ClientConnectionSender>,
        auth: AuthCtx,
        subscription: Subscribe,
        timer: Instant,
        _assert: Option<AssertTxFn>,
    ) -> Result<ExecutionMetrics, DBError> {
        let num_queries = subscription.query_strings.len();
        let database_identity = self.relational_db.database_identity();
        let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Subscribe);

        // How many queries make up this subscription?
        subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);

        let (queries, auth, mut_tx, compile_timer) = self.compile_queries(
            sender.id.identity,
            auth,
            &subscription.query_strings,
            num_queries,
            &subscription_metrics,
        )?;

        let (tx, tx_offset) = self
            .materialize_views_and_downgrade_tx(host, mut_tx, &queries, auth.caller())
            .await?;

        check_row_limit(
            &queries,
            &self.relational_db,
            &tx,
            |plan, tx| {
                plan.plans_fragments()
                    .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
                    .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
            },
            &auth,
        )?;

        // Record how long it took to compile the subscription
        drop(compile_timer);

        let tx = DeltaTx::from(&*tx);
        let (database_update, metrics) = match sender.config.protocol {
            Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe)
                .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
            Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe)
                .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
        };

        // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
        // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
        // but that should not pose an issue.
        {
            let _compile_timer = subscription_metrics.compilation_time.start_timer();

            let mut subscriptions = {
                // How contended is the lock?
                let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
                let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
                self.subscriptions.write()
            };

            subscriptions.set_legacy_subscription(sender.clone(), queries.into_iter());
        }

        #[cfg(test)]
        if let Some(assert) = _assert {
            assert(&tx);
        }

        // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
        // queue while we are still holding a read-lock on the database.

        // That will avoid race conditions because reducers first grab a write lock on the db, then
        // grab a read lock on the subscriptions.

        // Holding a write lock on `self.subscriptions` would also be sufficient.
        let _ = self.broadcast_queue.send_client_message(
            sender,
            Some(tx_offset),
            SubscriptionUpdateMessage {
                database_update,
                request_id: Some(subscription.request_id),
                timer: Some(timer),
            },
        );

        Ok(metrics)
    }

    pub fn remove_subscriber(&self, client_id: ClientActorId) {
        let mut subscriptions = self.subscriptions.write();
        subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id));
    }

    /// Rolls back `tx` and returns the offset as it was before `tx`.
    pub(crate) fn rollback_mut_tx(stdb: &RelationalDB, tx: MutTxId) -> TxOffset {
        let (tx_offset, tx_metrics, reducer) = stdb.rollback_mut_tx(tx);
        stdb.report_tx_metrics(reducer, None, Some(tx_metrics), None);
        tx_offset
    }

    /// Commit a transaction and broadcast its ModuleEvent to all interested subscribers.
    ///
    /// The returned [`ExecutionMetrics`] are reported in this method via `report_tx_metrics`.
    /// They are returned for testing purposes but should not be reported separately.
    pub fn commit_and_broadcast_event(
        &self,
        caller: Option<Arc<ClientConnectionSender>>,
        mut event: ModuleEvent,
        tx: MutTx,
    ) -> Result<CommitAndBroadcastEventResult, DBError> {
        let database_identity = self.relational_db.database_identity();
        let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Update);

        // Take a read lock on `subscriptions` before committing tx
        // else it can result in subscriber receiving duplicate updates.
        let subscriptions = {
            // How contended is the lock?
            let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
            let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
            self.subscriptions.read()
        };

        let stdb = &self.relational_db;
        // Downgrade mutable tx.
        // We'll later ensure tx is released/cleaned up once out of scope.
        let (read_tx, tx_data, tx_metrics_mut) = match &mut event.status {
            EventStatus::Committed(db_update) => {
                let Some((tx_data, tx_metrics, read_tx)) = stdb.commit_tx_downgrade(tx, Workload::Update)? else {
                    return Ok(Err(WriteConflict));
                };
                *db_update = DatabaseUpdate::from_writes(&tx_data);
                (read_tx, Arc::new(tx_data), tx_metrics)
            }
            EventStatus::Failed(_) | EventStatus::OutOfEnergy => {
                // If the transaction failed, we need to rollback the mutable tx.
                // We don't need to do any subscription updates in this case, so we will exit early.

                let event = Arc::new(event);
                let tx_offset = Self::rollback_mut_tx(stdb, tx);
                if let Some(client) = caller {
                    let message = TransactionUpdateMessage {
                        event: Some(event.clone()),
                        database_update: SubscriptionUpdateMessage::default_for_protocol(client.config.protocol, None),
                    };

                    let _ = self
                        .broadcast_queue
                        .send_client_message(client, Some(from_tx_offset(tx_offset)), message);
                } else {
                    log::trace!("Reducer failed but there is no client to send the failure to!")
                }
                return Ok(Ok(CommitAndBroadcastEventSuccess {
                    tx_offset: from_tx_offset(tx_offset),
                    event,
                    metrics: ExecutionMetrics::default(),
                }));
            }
        };
        let event = Arc::new(event);

        // When we're done with this method, release the tx and report metrics.
        let (extra_tx_offset_sender, extra_tx_offset) = oneshot::channel();
        let (mut read_tx, tx_offset) = self.guard_tx(
            read_tx,
            GuardTxOptions::full(extra_tx_offset_sender, Some(tx_data.clone()), tx_metrics_mut),
        );
        // Create the delta transaction we'll use to eval updates against.
        let delta_read_tx = DeltaTx::new(&read_tx, tx_data.as_ref(), subscriptions.index_ids_for_subscriptions());
        let update_metrics = subscriptions.eval_updates_sequential((&delta_read_tx, tx_offset), event.clone(), caller);
        read_tx.metrics.merge(update_metrics);
        Ok(Ok(CommitAndBroadcastEventSuccess {
            tx_offset: extra_tx_offset,
            event,
            metrics: update_metrics,
        }))
    }

    /// The same as [`Self::unsubscribe_views_and_downgrade_tx`] but doesn't take a tx handle.
    fn unsubscribe_views(
        &self,
        view_collector: &impl CollectViews,
        sender: Identity,
    ) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
        let tx = self
            .relational_db
            .begin_mut_tx(IsolationLevel::Serializable, Workload::Unsubscribe);
        self.unsubscribe_views_and_downgrade_tx(tx, view_collector, sender)
    }

    /// Unsubscribes a caller from the views returned by the `view_collector`,
    /// and subsequently downgrades to a read-only transaction.
    ///
    /// Unlike [`Self::materialize_views_and_downgrade_tx`] which populates the views' backing tables,
    /// this method just decrements the subscriber count in `st_view_sub`.
    /// Views without any subscribers are cleaned up async.
    fn unsubscribe_views_and_downgrade_tx(
        &self,
        mut tx: MutTxId,
        view_collector: &impl CollectViews,
        sender: Identity,
    ) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
        Self::_unsubscribe_views(&mut tx, view_collector, sender)?;
        let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe);
        let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
        Ok(self.guard_tx(tx, opts))
    }

    /// We unsubscribe from views by decrementing the subscriber count in `st_view_sub`.
    /// Views without any subscribers are cleaned up async.
    fn _unsubscribe_views(
        tx: &mut MutTxId,
        view_collector: &impl CollectViews,
        sender: Identity,
    ) -> Result<(), DBError> {
        let mut view_ids = HashSet::new();
        view_collector.collect_views(&mut view_ids);
        for view_id in view_ids {
            tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender)?;
        }
        Ok(())
    }

    /// Materialize the views returned by the `view_collector`, if not already materialized,
    /// and subsequently downgrade to a read-only transaction.
    async fn materialize_views_and_downgrade_tx(
        &self,
        host: Option<&ModuleHost>,
        mut tx: MutTxId,
        view_collector: &impl CollectViews,
        sender: Identity,
    ) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
        if let Some(host) = host {
            tx = host
                .materialize_views(tx, view_collector, sender, Workload::Subscribe)
                .await?
        }
        let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe);
        let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
        Ok(self.guard_tx(tx, opts))
    }

    /// Helper that starts a new mutable transaction, and guards it using
    /// [`Self::guard_mut_tx`] with the default configuration.
    fn begin_mut_tx(&self, workload: Workload) -> (MutTxGuard<impl FnOnce(MutTxId) + '_>, TransactionOffset) {
        self.guard_mut_tx(
            self.relational_db.begin_mut_tx(IsolationLevel::Serializable, workload),
            <_>::default(),
        )
    }

    /// Helper wrapping a [`TxId`] in a scopegard, with a configurable drop fn.
    ///
    /// By default, `tx` is released when the returned [`ScopeGuard`] is dropped,
    /// and reports the transaction metrics via [`RelationalDB::report_tx_metrics`].
    /// The `tx_data` and `tx_metrics_mut` parameters are passed to the metrics
    /// reporting method as-is; they can be used to report additional metrics
    /// about a previous mutable transaction that was downgraded to `tx` after
    /// committing.
    ///
    /// The method returns a [`ScopeGuard`] along with a [`TransactionOffset`].
    /// When the transaction commits, its transaction offset is sent to the
    /// latter (a [`oneshot::Receiver`]).
    /// If another receiver of the transaction offset is needed, its sending
    /// side can be passed in as `extra_tx_offset_sender`. It will be sent the
    /// offset as well.
    fn guard_tx(&self, tx: TxId, opts: GuardTxOptions) -> (TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset) {
        let (offset_tx, offset_rx) = oneshot::channel();
        let guard = scopeguard::guard(tx, |tx| {
            let (tx_offset, tx_metrics, reducer) = self.relational_db.release_tx(tx);
            log::trace!("read tx released with offset {tx_offset}");
            let _ = offset_tx.send(tx_offset);
            if let Some(extra) = opts.extra_tx_offset_sender {
                let _ = extra.send(tx_offset);
            }
            self.relational_db
                .report_tx_metrics(reducer, opts.tx_data, opts.tx_metrics_mut, Some(tx_metrics));
        });
        (guard, offset_rx)
    }

    /// The same as [`Self::guard_tx`] but for mutable transactions.
    ///
    /// By default, `tx` is committed when the returned [`ScopeGuard`] is dropped,
    /// and reports the transaction metrics via [`RelationalDB::report_tx_metrics`].
    fn guard_mut_tx(
        &self,
        tx: MutTxId,
        opts: GuardTxOptions,
    ) -> (MutTxGuard<impl FnOnce(MutTxId) + '_>, TransactionOffset) {
        let (offset_tx, offset_rx) = oneshot::channel();
        let guard = scopeguard::guard(tx, |tx| {
            if let Ok(Some((tx_offset, tx_data, tx_metrics_mut, reducer))) = self.relational_db.commit_tx(tx) {
                log::trace!("mutable tx committed with offset {tx_offset}");
                let _ = offset_tx.send(tx_offset);
                if let Some(extra) = opts.extra_tx_offset_sender {
                    let _ = extra.send(tx_offset);
                }
                self.relational_db
                    .report_tx_metrics(reducer, Some(Arc::new(tx_data)), Some(tx_metrics_mut), None);
            }
        });
        (guard, offset_rx)
    }
}

/// Extra parameters for [`ModuleSubscriptions::guard_tx`].
#[derive(Default)]
struct GuardTxOptions {
    /// Sender for an extra [`oneshot::Receiver`] for the transaction offset.
    extra_tx_offset_sender: Option<oneshot::Sender<TxOffset>>,
    /// [`TxData`] of a preceding mutable transaction.
    tx_data: Option<Arc<TxData>>,
    /// [`TxMetrics`] of a preceding mutable transaction.
    tx_metrics_mut: Option<TxMetrics>,
}

impl GuardTxOptions {
    fn full(
        extra_tx_offset_sender: oneshot::Sender<TxOffset>,
        tx_data: Option<Arc<TxData>>,
        tx_metrics_mut: TxMetrics,
    ) -> Self {
        Self {
            extra_tx_offset_sender: extra_tx_offset_sender.into(),
            tx_data,
            tx_metrics_mut: tx_metrics_mut.into(),
        }
    }

    fn from_mut(tx_data: TxData, tx_metrics_mut: TxMetrics) -> Self {
        Self {
            extra_tx_offset_sender: None,
            tx_data: Some(Arc::new(tx_data)),
            tx_metrics_mut: tx_metrics_mut.into(),
        }
    }
}

pub struct WriteConflict;

/// A [`ScopeGuard`] for [`TxId`]
type TxGuard<F> = ScopeGuard<TxId, F>;

/// A [`ScopeGuard`] for [`MutTxId`]
type MutTxGuard<F> = ScopeGuard<MutTxId, F>;

#[cfg(test)]
mod tests {
    use super::{AssertTxFn, ModuleSubscriptions};
    use crate::client::messages::{
        SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult,
        SubscriptionUpdateMessage, TransactionUpdateMessage,
    };
    use crate::client::{
        ClientActorId, ClientConfig, ClientConnectionReceiver, ClientConnectionSender, ClientName, Protocol,
    };
    use crate::db::relational_db::tests_utils::{
        begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TempReplicaDir, TestDB,
    };
    use crate::db::relational_db::{Persistence, RelationalDB, Txdata};
    use crate::error::DBError;
    use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall};
    use crate::messages::websocket as ws;
    use crate::sql::execute::run;
    use crate::subscription::module_subscription_actor::commit_and_broadcast_event;
    use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
    use crate::subscription::query::compile_read_only_query;
    use crate::subscription::TableUpdateType;
    use core::fmt;
    use hashbrown::HashMap;
    use itertools::Itertools;
    use pretty_assertions::assert_matches;
    use spacetimedb_client_api_messages::energy::EnergyQuanta;
    use spacetimedb_client_api_messages::websocket::{
        CompressableQueryUpdate, Compression, FormatSwitch, QueryId, Subscribe, SubscribeMulti, SubscribeSingle,
        TableUpdate, Unsubscribe, UnsubscribeMulti,
    };
    use spacetimedb_commitlog::{commitlog, repo};
    use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
    use spacetimedb_durability::{Durability, EmptyHistory, TxOffset};
    use spacetimedb_execution::dml::MutDatastore;
    use spacetimedb_lib::bsatn::ToBsatn;
    use spacetimedb_lib::db::auth::StAccess;
    use spacetimedb_lib::identity::AuthCtx;
    use spacetimedb_lib::metrics::ExecutionMetrics;
    use spacetimedb_lib::{bsatn, ConnectionId, ProductType, ProductValue, Timestamp};
    use spacetimedb_lib::{error::ResultTest, AlgebraicType, Identity};
    use spacetimedb_primitives::TableId;
    use spacetimedb_sats::product;
    use std::future::Future;
    use std::pin::pin;
    use std::sync::RwLock;
    use std::task::Poll;
    use std::time::Instant;
    use std::{sync::Arc, time::Duration};
    use tokio::sync::mpsc::{self};
    use tokio::sync::watch;

    fn add_subscriber(db: Arc<RelationalDB>, sql: &str, assert: Option<AssertTxFn>) -> Result<(), DBError> {
        // Create and enter a Tokio runtime to run the `ModuleSubscriptions`' background workers in parallel.
        let runtime = tokio::runtime::Runtime::new().unwrap();
        let _rt = runtime.enter();
        let owner = Identity::from_byte_array([1; 32]);
        let client = ClientActorId::for_test(Identity::ZERO);
        let config = ClientConfig::for_test();
        let sender = Arc::new(ClientConnectionSender::dummy(client, config, (*db).clone()));
        let send_worker_queue = spawn_send_worker(None);
        let module_subscriptions = ModuleSubscriptions::new(
            db.clone(),
            SubscriptionManager::for_test_without_metrics_arc_rwlock(),
            send_worker_queue,
        );
        let auth = AuthCtx::new(owner, sender.auth.claims.identity);

        let subscribe = Subscribe {
            query_strings: [sql.into()].into(),
            request_id: 0,
        };
        runtime.block_on(module_subscriptions.add_legacy_subscriber(
            None,
            sender,
            auth,
            subscribe,
            Instant::now(),
            assert,
        ))?;
        Ok(())
    }

    /// A [`Durability`] for which the durable offset is marked manually.
    struct ManualDurability {
        commitlog: Arc<RwLock<commitlog::Generic<repo::Memory, Txdata>>>,
        durable_offset: watch::Sender<Option<TxOffset>>,
    }

    impl ManualDurability {
        #[allow(unused)]
        fn mark_durable_at(&self, offset: TxOffset) {
            assert!(
                self.committed_offset().is_some_and(|committed| committed >= offset),
                "given offset is not in the commitlog"
            );
            self.durable_offset.send_modify(|val| {
                val.replace(offset);
            });
        }

        fn mark_durable(&self) {
            if let Some(offset) = self.committed_offset() {
                self.durable_offset.send_modify(|val| {
                    val.replace(offset);
                });
            }
        }

        fn committed_offset(&self) -> Option<TxOffset> {
            self.commitlog.read().unwrap().max_committed_offset()
        }
    }

    impl Durability for ManualDurability {
        type TxData = Txdata;

        fn append_tx(&self, tx: Self::TxData) {
            let mut commitlog = self.commitlog.write().unwrap();
            if let Err(tx) = commitlog.append(tx) {
                commitlog.commit().expect("error flushing commitlog");
                commitlog.append(tx).expect("should be able to append after flush");
            }
            commitlog.commit().expect("error flushing commitlog");
        }

        fn durable_tx_offset(&self) -> spacetimedb_durability::DurableOffset {
            self.durable_offset.subscribe().into()
        }
    }

    impl Default for ManualDurability {
        fn default() -> Self {
            let (durable_offset, ..) = watch::channel(None);
            Self {
                commitlog: Arc::new(RwLock::new(
                    commitlog::Generic::open(repo::Memory::unlimited(), <_>::default()).unwrap(),
                )),
                durable_offset,
            }
        }
    }

    /// An in-memory `RelationalDB` for testing
    fn relational_db() -> anyhow::Result<Arc<RelationalDB>> {
        let TestDB { db, .. } = TestDB::in_memory()?;
        Ok(Arc::new(db))
    }

    /// An in-memory `RelationalDB` with `ManualDurability`.
    fn relational_db_with_manual_durability() -> anyhow::Result<(Arc<RelationalDB>, Arc<ManualDurability>)> {
        let dir = TempReplicaDir::new()?;
        let durability = Arc::new(ManualDurability::default());
        let db = TestDB::open_db(
            &dir,
            EmptyHistory::new(),
            Some(Persistence {
                durability: durability.clone(),
                disk_size: Arc::new(|| Ok(<_>::default())),
                snapshots: None,
            }),
            None,
            0,
        )?;

        Ok((Arc::new(db), durability))
    }

    /// A [SubscribeSingle] message for testing
    fn single_subscribe(sql: &str, query_id: u32) -> SubscribeSingle {
        SubscribeSingle {
            query: sql.into(),
            request_id: 0,
            query_id: QueryId::new(query_id),
        }
    }

    /// A [SubscribeMulti] message for testing
    fn multi_subscribe(query_strings: &[&'static str], query_id: u32) -> SubscribeMulti {
        SubscribeMulti {
            query_strings: query_strings
                .iter()
                .map(|sql| String::from(*sql).into_boxed_str())
                .collect(),
            request_id: 0,
            query_id: QueryId::new(query_id),
        }
    }

    /// A [SubscribeMulti] message for testing
    fn multi_unsubscribe(query_id: u32) -> UnsubscribeMulti {
        UnsubscribeMulti {
            request_id: 0,
            query_id: QueryId::new(query_id),
        }
    }

    /// An [Unsubscribe] message for testing
    fn single_unsubscribe(query_id: u32) -> Unsubscribe {
        Unsubscribe {
            request_id: 0,
            query_id: QueryId::new(query_id),
        }
    }

    /// A dummy [ModuleEvent] for testing
    fn module_event() -> ModuleEvent {
        ModuleEvent {
            timestamp: Timestamp::now(),
            caller_identity: Identity::ZERO,
            caller_connection_id: None,
            function_call: ModuleFunctionCall::default(),
            status: EventStatus::Committed(DatabaseUpdate::default()),
            energy_quanta_used: EnergyQuanta { quanta: 0 },
            host_execution_duration: Duration::from_millis(0),
            request_id: None,
            timer: None,
        }
    }

    /// Create an [Identity] from a [u8]
    fn identity_from_u8(v: u8) -> Identity {
        Identity::from_byte_array([v; 32])
    }

    /// Create an [ConnectionId] from a [u8]
    fn connection_id_from_u8(v: u8) -> ConnectionId {
        ConnectionId::from_be_byte_array([v; 16])
    }

    /// Create an [ClientActorId] from a [u8].
    /// Calls [identity_from_u8] internally with the passed value.
    fn client_id_from_u8(v: u8) -> ClientActorId {
        ClientActorId {
            identity: identity_from_u8(v),
            connection_id: connection_id_from_u8(v),
            name: ClientName(v as u64),
        }
    }

    fn client_connection_with_config(
        client_id: ClientActorId,
        db: &RelationalDB,
        config: ClientConfig,
    ) -> (Arc<ClientConnectionSender>, ClientConnectionReceiver) {
        let (sender, receiver) = ClientConnectionSender::dummy_with_channel(client_id, config, db.clone());
        (Arc::new(sender), receiver)
    }

    /// Instantiate a client connection with compression
    fn client_connection_with_compression(
        client_id: ClientActorId,
        db: &RelationalDB,
        compression: Compression,
    ) -> (Arc<ClientConnectionSender>, ClientConnectionReceiver) {
        client_connection_with_config(
            client_id,
            db,
            ClientConfig {
                protocol: Protocol::Binary,
                compression,
                tx_update_full: true,
                confirmed_reads: false,
            },
        )
    }

    /// Instantiate a client connection
    fn client_connection(
        client_id: ClientActorId,
        db: &RelationalDB,
    ) -> (Arc<ClientConnectionSender>, ClientConnectionReceiver) {
        client_connection_with_compression(client_id, db, Compression::None)
    }

    /// Instantiate a client connection with confirmed reads turned on or off.
    fn client_connection_with_confirmed_reads(
        client_id: ClientActorId,
        db: &RelationalDB,
        confirmed_reads: bool,
    ) -> (Arc<ClientConnectionSender>, ClientConnectionReceiver) {
        client_connection_with_config(
            client_id,
            db,
            ClientConfig {
                protocol: Protocol::Binary,
                compression: Compression::None,
                tx_update_full: true,
                confirmed_reads,
            },
        )
    }

    /// Insert rules into the RLS system table
    fn insert_rls_rules(
        db: &RelationalDB,
        table_ids: impl IntoIterator<Item = TableId>,
        rules: impl IntoIterator<Item = &'static str>,
    ) -> anyhow::Result<()> {
        with_auto_commit(db, |tx| {
            for (table_id, sql) in table_ids.into_iter().zip(rules) {
                db.insert(
                    tx,
                    ST_ROW_LEVEL_SECURITY_ID,
                    &ProductValue::from(StRowLevelSecurityRow {
                        table_id,
                        sql: sql.into(),
                    })
                    .to_bsatn_vec()?,
                )?;
            }
            Ok(())
        })
    }

    /// Subscribe to a query as a client
    async fn subscribe_single(
        subs: &ModuleSubscriptions,
        auth: AuthCtx,
        sql: &'static str,
        sender: Arc<ClientConnectionSender>,
        counter: &mut u32,
    ) -> anyhow::Result<()> {
        *counter += 1;
        subs.add_single_subscription(
            None,
            sender,
            auth,
            single_subscribe(sql, *counter),
            Instant::now(),
            None,
        )
        .await?;
        Ok(())
    }

    /// Subscribe to a set of queries as a client
    async fn subscribe_multi(
        subs: &ModuleSubscriptions,
        auth: AuthCtx,
        queries: &[&'static str],
        sender: Arc<ClientConnectionSender>,
        counter: &mut u32,
    ) -> anyhow::Result<ExecutionMetrics> {
        *counter += 1;
        let metrics = subs
            .add_multi_subscription(
                None,
                sender,
                auth,
                multi_subscribe(queries, *counter),
                Instant::now(),
                None,
            )
            .await?
            .unwrap_or_default();
        Ok(metrics)
    }

    /// Unsubscribe from a single query
    fn unsubscribe_single(
        subs: &ModuleSubscriptions,
        auth: AuthCtx,
        sender: Arc<ClientConnectionSender>,
        query_id: u32,
    ) -> anyhow::Result<()> {
        subs.remove_single_subscription(sender, auth, single_unsubscribe(query_id), Instant::now())?;
        Ok(())
    }

    /// Unsubscribe from a set of queries
    fn unsubscribe_multi(
        subs: &ModuleSubscriptions,
        auth: AuthCtx,
        sender: Arc<ClientConnectionSender>,
        query_id: u32,
    ) -> anyhow::Result<()> {
        subs.remove_multi_subscription(sender, auth, multi_unsubscribe(query_id), Instant::now())?;
        Ok(())
    }

    /// Pull a message from receiver and assert that it is a `TxUpdate` with the expected rows
    async fn assert_tx_update_for_table(
        rx: impl Future<Output = Option<SerializableMessage>>,
        table_id: TableId,
        schema: &ProductType,
        inserts: impl IntoIterator<Item = ProductValue>,
        deletes: impl IntoIterator<Item = ProductValue>,
    ) {
        match rx.await {
            Some(SerializableMessage::TxUpdate(TransactionUpdateMessage {
                database_update:
                    SubscriptionUpdateMessage {
                        database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { mut tables }),
                        ..
                    },
                ..
            })) => {
                // Assume an update for only one table
                assert_eq!(tables.len(), 1);

                let table_update = tables.pop().unwrap();

                // We should not be sending empty updates to clients
                assert_ne!(table_update.num_rows, 0);

                // It should be the table we expect
                assert_eq!(table_update.table_id, table_id);

                let mut rows_received: HashMap<ProductValue, i32> = HashMap::new();

                for uncompressed in table_update.updates {
                    let CompressableQueryUpdate::Uncompressed(table_update) = uncompressed else {
                        panic!("expected an uncompressed table update")
                    };

                    for row in table_update
                        .inserts
                        .into_iter()
                        .map(|bytes| ProductValue::decode(schema, &mut &*bytes).unwrap())
                    {
                        *rows_received.entry(row).or_insert(0) += 1;
                    }

                    for row in table_update
                        .deletes
                        .into_iter()
                        .map(|bytes| ProductValue::decode(schema, &mut &*bytes).unwrap())
                    {
                        *rows_received.entry(row).or_insert(0) -= 1;
                    }
                }

                assert_eq!(
                    rows_received
                        .iter()
                        .filter(|(_, n)| n > &&0)
                        .map(|(row, _)| row)
                        .cloned()
                        .sorted()
                        .collect::<Vec<_>>(),
                    inserts.into_iter().sorted().collect::<Vec<_>>()
                );
                assert_eq!(
                    rows_received
                        .iter()
                        .filter(|(_, n)| n < &&0)
                        .map(|(row, _)| row)
                        .cloned()
                        .sorted()
                        .collect::<Vec<_>>(),
                    deletes.into_iter().sorted().collect::<Vec<_>>()
                );
            }
            Some(msg) => panic!("expected a TxUpdate, but got {msg:#?}"),
            None => panic!("The receiver closed due to an error"),
        }
    }

    /// Assert that the future `f` completes only after `durability` is marked
    /// durable.
    ///
    /// Namely:
    ///
    /// - assert that polling `f` once returns [`Poll::Pending`]
    /// - call `durability.mark_durable()`
    /// - assert that polling `f` returns [`Poll::Ready`].
    ///
    async fn assert_after_durable(durability: &ManualDurability, f: impl Future<Output: fmt::Debug>) {
        let mut g = pin!(f);
        assert_matches!(futures::poll!(&mut g), Poll::Pending);
        durability.mark_durable();
        assert_matches!(futures::poll!(g), Poll::Ready(_));
    }

    /// Commit a set of row updates and broadcast to subscribers
    fn commit_tx(
        db: &RelationalDB,
        subs: &ModuleSubscriptions,
        deletes: impl IntoIterator<Item = (TableId, ProductValue)>,
        inserts: impl IntoIterator<Item = (TableId, ProductValue)>,
    ) -> anyhow::Result<ExecutionMetrics> {
        let mut tx = begin_mut_tx(db);
        for (table_id, row) in deletes {
            tx.delete_product_value(table_id, &row)?;
        }
        for (table_id, row) in inserts {
            db.insert(&mut tx, table_id, &bsatn::to_vec(&row)?)?;
        }

        let success = commit_and_broadcast_event(subs, None, module_event(), tx);
        Ok(success.metrics)
    }

    #[test]
    fn test_subscribe_metrics() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id = client_id_from_u8(1);
        let (sender, _) = client_connection(client_id, &db);

        let (subs, _runtime) = ModuleSubscriptions::for_test_new_runtime(db.clone());

        // Create a table `t` with index on `id`
        let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::U64)], &[0.into()])?;
        with_auto_commit(&db, |tx| -> anyhow::Result<_> {
            db.insert(tx, table_id, &bsatn::to_vec(&product![1_u64])?)?;
            Ok(())
        })?;

        let auth = AuthCtx::for_testing();
        let sql = "select * from t where id = 1";
        let tx = begin_tx(&db);
        let plan = compile_read_only_query(&auth, &tx, sql)?;
        let plan = Arc::new(plan);

        let (_, metrics) = subs.evaluate_queries(sender, &[plan], &tx, &auth, TableUpdateType::Subscribe)?;

        // We only probe the index once
        assert_eq!(metrics.index_seeks, 1);
        // We scan a single u64 when serializing the result
        assert_eq!(metrics.bytes_scanned, 8);
        // Subscriptions are read-only
        assert_eq!(metrics.bytes_written, 0);
        // Bytes scanned and bytes sent will always be the same for an initial subscription,
        // because a subscription is initiated by a single client.
        assert_eq!(metrics.bytes_sent_to_clients, 8);

        // Note, rows scanned may be greater than one.
        // It depends on the number of operators used to answer the query.
        assert!(metrics.rows_scanned > 0);
        Ok(())
    }

    fn check_subscription_err(sql: &str, result: Option<SerializableMessage>) {
        if let Some(SerializableMessage::Subscription(SubscriptionMessage {
            result: SubscriptionResult::Error(SubscriptionError { message, .. }),
            ..
        })) = result
        {
            assert!(
                message.contains(sql),
                "Expected error message to contain the SQL query: {sql}, but got: {message}",
            );
            return;
        }
        panic!("Expected a subscription error message, but got: {result:?}");
    }

    /// Test that clients receive error messages on subscribe
    #[tokio::test]
    async fn subscribe_single_error() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id = client_id_from_u8(1);
        let (tx, mut rx) = client_connection(client_id, &db);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;

        // Subscribe to an invalid query (r is not in scope)
        let sql = "select r.* from t";
        subscribe_single(&subs, auth, sql, tx, &mut 0).await?;

        check_subscription_err(sql, rx.recv().await);

        Ok(())
    }

    /// Test that clients receive error messages on subscribe
    #[tokio::test]
    async fn subscribe_multi_error() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id = client_id_from_u8(1);
        let (tx, mut rx) = client_connection(client_id, &db);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;

        // Subscribe to an invalid query (r is not in scope)
        let sql = "select r.* from t";
        subscribe_multi(&subs, auth, &[sql], tx, &mut 0).await?;

        check_subscription_err(sql, rx.recv().await);

        Ok(())
    }

    /// Test that clients receive error messages on unsubscribe
    #[tokio::test]
    async fn unsubscribe_single_error() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id = client_id_from_u8(1);
        let (tx, mut rx) = client_connection(client_id, &db);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        // Create a table `t` with an index on `id`
        let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::U8)], &[0.into()])?;
        let index_id = with_read_only(&db, |tx| {
            db.schema_for_table(&*tx, table_id).map(|schema| {
                schema
                    .indexes
                    .first()
                    .map(|index_schema| index_schema.index_id)
                    .unwrap()
            })
        })?;

        let mut query_id = 0;

        // Subscribe to `t`
        let sql = "select * from t where id = 1";
        subscribe_single(&subs, auth.clone(), sql, tx.clone(), &mut query_id).await?;

        // The initial subscription should succeed
        assert!(matches!(
            rx.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::Subscribe(..),
                ..
            }))
        ));

        // Remove the index from `id`
        with_auto_commit(&db, |tx| db.drop_index(tx, index_id))?;

        // Unsubscribe from `t`
        unsubscribe_single(&subs, auth, tx, query_id)?;

        // Why does the unsubscribe fail?
        // This relies on some knowledge of the underlying implementation.
        // Specifically that we do not recompile queries on unsubscribe.
        // We execute the cached plan which in this case is an index scan.
        // The index no longer exists, and therefore it fails.
        check_subscription_err(sql, rx.recv().await);

        Ok(())
    }

    /// Test that clients receive error messages on unsubscribe
    ///
    /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.
    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn unsubscribe_multi_error() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id = client_id_from_u8(1);
        let (tx, mut rx) = client_connection(client_id, &db);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        // Create a table `t` with an index on `id`
        let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::U8)], &[0.into()])?;
        let index_id = with_read_only(&db, |tx| {
            db.schema_for_table(&*tx, table_id).map(|schema| {
                schema
                    .indexes
                    .first()
                    .map(|index_schema| index_schema.index_id)
                    .unwrap()
            })
        })?;

        commit_tx(&db, &subs, [], [(table_id, product![0_u8])])?;

        let mut query_id = 0;

        // Subscribe to `t`
        let sql = "select * from t where id = 1";
        subscribe_multi(&subs, auth.clone(), &[sql], tx.clone(), &mut query_id).await?;

        // The initial subscription should succeed
        assert!(matches!(
            rx.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(..),
                ..
            }))
        ));

        // Remove the index from `id`
        with_auto_commit(&db, |tx| db.drop_index(tx, index_id))?;

        // Unsubscribe from `t`
        unsubscribe_multi(&subs, auth, tx, query_id)?;

        // Why does the unsubscribe fail?
        // This relies on some knowledge of the underlying implementation.
        // Specifically that we do not recompile queries on unsubscribe.
        // We execute the cached plan which in this case is an index scan.
        // The index no longer exists, and therefore it fails.
        check_subscription_err(sql, rx.recv().await);

        Ok(())
    }

    /// Test that clients receive error messages on tx updates
    #[tokio::test]
    async fn tx_update_error() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id = client_id_from_u8(1);
        let (tx, mut rx) = client_connection(client_id, &db);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        // Create two tables `t` and `s` with indexes on their `id` columns
        let t_id = db.create_table_for_test("t", &[("id", AlgebraicType::U8)], &[0.into()])?;
        let s_id = db.create_table_for_test("s", &[("id", AlgebraicType::U8)], &[0.into()])?;
        let index_id = with_read_only(&db, |tx| {
            db.schema_for_table(&*tx, s_id).map(|schema| {
                schema
                    .indexes
                    .first()
                    .map(|index_schema| index_schema.index_id)
                    .unwrap()
            })
        })?;
        let sql = "select t.* from t join s on t.id = s.id";
        subscribe_single(&subs, auth, sql, tx, &mut 0).await?;

        // The initial subscription should succeed
        assert!(matches!(
            rx.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::Subscribe(..),
                ..
            }))
        ));

        // Remove the index from `s`
        with_auto_commit(&db, |tx| db.drop_index(tx, index_id))?;

        // Start a new transaction and insert a new row into `t`
        let mut tx = begin_mut_tx(&db);
        db.insert(&mut tx, t_id, &bsatn::to_vec(&product![2_u8])?)?;

        assert!(matches!(
            subs.commit_and_broadcast_event(None, module_event(), tx),
            Ok(Ok(_))
        ));

        // Why does the update fail?
        // This relies on some knowledge of the underlying implementation.
        // Specifically, plans are cached on the initial subscribe.
        // Hence we execute a cached plan which happens to be an index join.
        // We've removed the index on `s`, and therefore it fails.
        check_subscription_err(sql, rx.recv().await);

        Ok(())
    }

    /// Test that two clients can subscribe to a parameterized query and get the correct rows.
    #[tokio::test]
    async fn test_parameterized_subscription() -> anyhow::Result<()> {
        let db = relational_db()?;

        // Create identities for two different clients
        let id_for_a = identity_from_u8(1);
        let id_for_b = identity_from_u8(2);

        let client_id_for_a = client_id_from_u8(1);
        let client_id_for_b = client_id_from_u8(2);

        // Establish a connection for each client
        let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a, &db);
        let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b, &db);

        let auth_for_a = AuthCtx::new(db.owner_identity(), client_id_for_a.identity);
        let auth_for_b = AuthCtx::new(db.owner_identity(), client_id_for_b.identity);

        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let schema = [("identity", AlgebraicType::identity())];

        let table_id = db.create_table_for_test("t", &schema, &[])?;

        let mut query_ids = 0;

        // Have each client subscribe to the same parameterized query.
        // Each client should receive different rows.
        subscribe_multi(
            &subs,
            auth_for_a,
            &["select * from t where identity = :sender"],
            tx_for_a,
            &mut query_ids,
        )
        .await?;
        subscribe_multi(
            &subs,
            auth_for_b,
            &["select * from t where identity = :sender"],
            tx_for_b,
            &mut query_ids,
        )
        .await?;

        // Wait for both subscriptions
        assert!(matches!(
            rx_for_a.recv().await,
            Some(SerializableMessage::Subscription(_))
        ));
        assert!(matches!(
            rx_for_b.recv().await,
            Some(SerializableMessage::Subscription(_))
        ));

        // Insert two identities - one for each caller - into the table
        let mut tx = begin_mut_tx(&db);
        db.insert(&mut tx, table_id, &bsatn::to_vec(&product![id_for_a])?)?;
        db.insert(&mut tx, table_id, &bsatn::to_vec(&product![id_for_b])?)?;

        assert!(matches!(
            subs.commit_and_broadcast_event(None, module_event(), tx),
            Ok(Ok(_))
        ));

        let schema = ProductType::from([AlgebraicType::identity()]);

        // Both clients should only receive their identities and not the other's.
        assert_tx_update_for_table(rx_for_a.recv(), table_id, &schema, [product![id_for_a]], []).await;
        assert_tx_update_for_table(rx_for_b.recv(), table_id, &schema, [product![id_for_b]], []).await;
        Ok(())
    }

    /// Test that two clients can subscribe to a table with RLS rules and get the correct rows
    #[tokio::test]
    async fn test_rls_subscription() -> anyhow::Result<()> {
        let db = relational_db()?;

        // Create identities for two different clients
        let id_for_a = identity_from_u8(1);
        let id_for_b = identity_from_u8(2);

        let client_id_for_a = client_id_from_u8(1);
        let client_id_for_b = client_id_from_u8(2);

        // Establish a connection for each client
        let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a, &db);
        let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b, &db);

        let auth_for_a = AuthCtx::new(db.owner_identity(), client_id_for_a.identity);
        let auth_for_b = AuthCtx::new(db.owner_identity(), client_id_for_b.identity);

        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let schema = [("id", AlgebraicType::identity())];

        let u_id = db.create_table_for_test("u", &schema, &[0.into()])?;
        let v_id = db.create_table_for_test("v", &schema, &[0.into()])?;
        let w_id = db.create_table_for_test("w", &schema, &[0.into()])?;

        insert_rls_rules(
            &db,
            [u_id, v_id, w_id, w_id],
            [
                "select * from u where id = :sender",
                "select * from v where id = :sender",
                "select w.* from u join w on u.id = w.id",
                "select w.* from v join w on v.id = w.id",
            ],
        )?;

        let mut query_ids = 0;

        // Have each client subscribe to `w`.
        // Because `w` is gated using parameterized RLS rules,
        // each client should receive different rows.
        subscribe_multi(&subs, auth_for_a, &["select * from w"], tx_for_a, &mut query_ids).await?;
        subscribe_multi(&subs, auth_for_b, &["select * from w"], tx_for_b, &mut query_ids).await?;

        // Wait for both subscriptions
        assert!(matches!(
            rx_for_a.recv().await,
            Some(SerializableMessage::Subscription(_))
        ));
        assert!(matches!(
            rx_for_b.recv().await,
            Some(SerializableMessage::Subscription(_))
        ));

        // Insert a row into `u` for client "a".
        // Insert a row into `v` for client "b".
        // Insert a row into `w` for both.
        let mut tx = begin_mut_tx(&db);
        db.insert(&mut tx, u_id, &bsatn::to_vec(&product![id_for_a])?)?;
        db.insert(&mut tx, v_id, &bsatn::to_vec(&product![id_for_b])?)?;
        db.insert(&mut tx, w_id, &bsatn::to_vec(&product![id_for_a])?)?;
        db.insert(&mut tx, w_id, &bsatn::to_vec(&product![id_for_b])?)?;

        assert!(matches!(
            subs.commit_and_broadcast_event(None, module_event(), tx),
            Ok(Ok(_))
        ));

        let schema = ProductType::from([AlgebraicType::identity()]);

        // Both clients should only receive their identities and not the other's.
        assert_tx_update_for_table(rx_for_a.recv(), w_id, &schema, [product![id_for_a]], []).await;
        assert_tx_update_for_table(rx_for_b.recv(), w_id, &schema, [product![id_for_b]], []).await;
        Ok(())
    }

    /// Test that a client and the database owner can subscribe to the same query
    #[tokio::test]
    async fn test_rls_for_owner() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id_for_a = client_id_from_u8(0);
        let client_id_for_b = client_id_from_u8(1);

        // Establish a connection for owner and client
        let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a, &db);
        let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b, &db);

        let auth_for_a = AuthCtx::new(db.owner_identity(), client_id_for_a.identity);
        let auth_for_b = AuthCtx::new(db.owner_identity(), client_id_for_b.identity);

        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        // Create table `t`
        let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::identity())], &[0.into()])?;

        // Restrict access to `t`
        insert_rls_rules(&db, [table_id], ["select * from t where id = :sender"])?;

        let mut query_ids = 0;

        // Have owner and client subscribe to `t`
        subscribe_multi(&subs, auth_for_a, &["select * from t"], tx_for_a, &mut query_ids).await?;
        subscribe_multi(&subs, auth_for_b, &["select * from t"], tx_for_b, &mut query_ids).await?;

        // Wait for both subscriptions
        assert_matches!(
            rx_for_a.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        );
        assert_matches!(
            rx_for_b.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        );

        let schema = ProductType::from([AlgebraicType::identity()]);

        let id_for_b = identity_from_u8(1);
        let id_for_c = identity_from_u8(2);

        commit_tx(
            &db,
            &subs,
            [],
            [
                // Insert an identity for client `b` plus a random identity
                (table_id, product![id_for_b]),
                (table_id, product![id_for_c]),
            ],
        )?;

        assert_tx_update_for_table(
            rx_for_a.recv(),
            table_id,
            &schema,
            // The owner should receive both identities
            [product![id_for_b], product![id_for_c]],
            [],
        )
        .await;

        assert_tx_update_for_table(
            rx_for_b.recv(),
            table_id,
            &schema,
            // Client `b` should only receive its identity
            [product![id_for_b]],
            [],
        )
        .await;

        Ok(())
    }

    /// Test that we do not send empty updates to clients
    #[tokio::test]
    async fn test_no_empty_updates() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id = client_id_from_u8(1);

        // Establish a client connection
        let (tx, mut rx) = client_connection(client_id, &db);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let schema = [("x", AlgebraicType::U8)];

        let t_id = db.create_table_for_test("t", &schema, &[])?;

        // Subscribe to rows of `t` where `x` is 0
        subscribe_multi(&subs, auth, &["select * from t where x = 0"], tx, &mut 0).await?;

        // Wait to receive the initial subscription message
        assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));

        // Insert a row that does not match the query
        let mut tx = begin_mut_tx(&db);
        db.insert(&mut tx, t_id, &bsatn::to_vec(&product![1_u8])?)?;

        assert!(matches!(
            subs.commit_and_broadcast_event(None, module_event(), tx),
            Ok(Ok(_))
        ));

        // Insert a row that does match the query
        let mut tx = begin_mut_tx(&db);
        db.insert(&mut tx, t_id, &bsatn::to_vec(&product![0_u8])?)?;

        assert!(matches!(
            subs.commit_and_broadcast_event(None, module_event(), tx),
            Ok(Ok(_))
        ));

        let schema = ProductType::from([AlgebraicType::U8]);

        // If the server sends empty updates, this assertion will fail,
        // because we will receive one for the first transaction.
        assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8]], []).await;
        Ok(())
    }

    /// Test that we do not compress within a [SubscriptionMessage].
    /// The message itself is compressed before being sent over the wire,
    /// but we don't care about that for this test.
    ///
    /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.
    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn test_no_compression_for_subscribe() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id = client_id_from_u8(1);
        // Establish a client connection with compression
        let (tx, mut rx) = client_connection_with_compression(client_id, &db, Compression::Brotli);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?;

        let mut inserts = vec![];

        for i in 0..16_000u64 {
            inserts.push((table_id, product![i]));
        }

        // Insert a lot of rows into `t`.
        // We want to insert enough to cross any threshold there might be for compression.
        commit_tx(&db, &subs, [], inserts)?;

        // Subscribe to the entire table
        subscribe_multi(&subs, auth, &["select * from t"], tx, &mut 0).await?;

        // Assert the table updates within this message are all be uncompressed
        match rx.recv().await {
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result:
                    SubscriptionResult::SubscribeMulti(SubscriptionData {
                        data: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }),
                    }),
                ..
            })) => {
                assert!(tables.iter().all(|TableUpdate { updates, .. }| updates
                    .iter()
                    .all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_)))));
            }
            Some(_) => panic!("unexpected message from subscription"),
            None => panic!("channel unexpectedly closed"),
        };

        Ok(())
    }

    /// Test that we receive subscription updates for DML
    #[tokio::test]
    async fn test_updates_for_dml() -> anyhow::Result<()> {
        let db = relational_db()?;

        // Establish a client connection
        let client_id = client_id_from_u8(1);
        let (tx, mut rx) = client_connection(client_id, &db);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
        let schema = [("x", AlgebraicType::U8), ("y", AlgebraicType::U8)];
        let t_id = db.create_table_for_test("t", &schema, &[])?;

        // Subscribe to `t`
        subscribe_multi(&subs, auth, &["select * from t"], tx, &mut 0).await?;

        // Wait to receive the initial subscription message
        assert_matches!(rx.recv().await, Some(SerializableMessage::Subscription(_)));

        let schema = ProductType::from([AlgebraicType::U8, AlgebraicType::U8]);

        // Only the owner can invoke DML commands
        let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0));

        run(
            &db,
            "INSERT INTO t (x, y) VALUES (0, 1)",
            auth.clone(),
            Some(&subs),
            None,
            &mut vec![],
        )
        .await?;

        // Client should receive insert
        assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 1_u8]], []).await;

        run(
            &db,
            "UPDATE t SET y=2 WHERE x=0",
            auth.clone(),
            Some(&subs),
            None,
            &mut vec![],
        )
        .await?;

        // Client should receive update
        assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await;

        run(&db, "DELETE FROM t WHERE x=0", auth, Some(&subs), None, &mut vec![]).await?;

        // Client should receive delete
        assert_tx_update_for_table(rx.recv(), t_id, &schema, [], [product![0_u8, 2_u8]]).await;
        Ok(())
    }

    /// Test that we do not compress within a [TransactionUpdateMessage].
    /// The message itself is compressed before being sent over the wire,
    /// but we don't care about that for this test.
    #[tokio::test]
    async fn test_no_compression_for_update() -> anyhow::Result<()> {
        let db = relational_db()?;

        // Establish a client connection with compression
        let client_id = client_id_from_u8(1);
        let (tx, mut rx) = client_connection_with_compression(client_id, &db, Compression::Brotli);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?;

        let mut inserts = vec![];

        for i in 0..16_000u64 {
            inserts.push((table_id, product![i]));
        }

        // Subscribe to the entire table
        subscribe_multi(&subs, auth, &["select * from t"], tx, &mut 0).await?;

        // Wait to receive the initial subscription message
        assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));

        // Insert a lot of rows into `t`.
        // We want to insert enough to cross any threshold there might be for compression.
        commit_tx(&db, &subs, [], inserts)?;

        // Assert the table updates within this message are all be uncompressed
        match rx.recv().await {
            Some(SerializableMessage::TxUpdate(TransactionUpdateMessage {
                database_update:
                    SubscriptionUpdateMessage {
                        database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }),
                        ..
                    },
                ..
            })) => {
                assert!(tables.iter().all(|TableUpdate { updates, .. }| updates
                    .iter()
                    .all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_)))));
            }
            Some(_) => panic!("unexpected message from subscription"),
            None => panic!("channel unexpectedly closed"),
        };

        Ok(())
    }

    /// In this test we subscribe to a join query, update the lhs table,
    /// and assert that the server sends the correct delta to the client.
    #[tokio::test]
    async fn test_update_for_join() -> anyhow::Result<()> {
        async fn test_subscription_updates(queries: &[&'static str]) -> anyhow::Result<()> {
            let db = relational_db()?;

            // Establish a client connection
            let client_id = client_id_from_u8(1);
            let (sender, mut rx) = client_connection(client_id, &db);

            let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
            let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

            let p_schema = [("id", AlgebraicType::U64), ("signed_in", AlgebraicType::Bool)];
            let l_schema = [
                ("id", AlgebraicType::U64),
                ("x", AlgebraicType::U64),
                ("z", AlgebraicType::U64),
            ];

            let p_id = db.create_table_for_test("p", &p_schema, &[0.into()])?;
            let l_id = db.create_table_for_test("l", &l_schema, &[0.into()])?;

            subscribe_multi(&subs, auth, queries, sender, &mut 0).await?;

            assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));

            // Insert two matching player rows
            commit_tx(
                &db,
                &subs,
                [],
                [
                    (p_id, product![1_u64, true]),
                    (p_id, product![2_u64, true]),
                    (l_id, product![1_u64, 2_u64, 2_u64]),
                    (l_id, product![2_u64, 3_u64, 3_u64]),
                ],
            )?;

            let schema = ProductType::from(p_schema);

            // We should receive both matching player rows
            assert_tx_update_for_table(
                rx.recv(),
                p_id,
                &schema,
                [product![1_u64, true], product![2_u64, true]],
                [],
            )
            .await;

            // Update one of the matching player rows
            commit_tx(
                &db,
                &subs,
                [(p_id, product![2_u64, true])],
                [(p_id, product![2_u64, false])],
            )?;

            // We should receive an update for it because it is still matching
            assert_tx_update_for_table(
                rx.recv(),
                p_id,
                &schema,
                [product![2_u64, false]],
                [product![2_u64, true]],
            )
            .await;

            // Update the the same matching player row
            commit_tx(
                &db,
                &subs,
                [(p_id, product![2_u64, false])],
                [(p_id, product![2_u64, true])],
            )?;

            // We should receive an update for it because it is still matching
            assert_tx_update_for_table(
                rx.recv(),
                p_id,
                &schema,
                [product![2_u64, true]],
                [product![2_u64, false]],
            )
            .await;

            Ok(())
        }

        test_subscription_updates(&[
            "select * from p where id = 1",
            "select p.* from p join l on p.id = l.id where l.x > 0 and l.x < 5 and l.z > 0 and l.z < 5",
        ])
        .await?;
        test_subscription_updates(&[
            "select * from p where id = 1",
            "select p.* from p join l on p.id = l.id where 0 < l.x and l.x < 5 and 0 < l.z and l.z < 5",
        ])
        .await?;
        test_subscription_updates(&[
            "select * from p where id = 1",
            "select p.* from p join l on p.id = l.id where l.x > 0 and l.x < 5 and l.x > 0 and l.z < 5 and l.id != 1",
        ])
        .await?;
        test_subscription_updates(&[
            "select * from p where id = 1",
            "select p.* from p join l on p.id = l.id where 0 < l.x and l.x < 5 and 0 < l.z and l.z < 5 and l.id != 1",
        ])
        .await?;

        Ok(())
    }

    /// Test that we do not evaluate queries that we know will not match table update rows
    ///
    /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.
    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn test_query_pruning() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id_a = client_id_from_u8(1);
        let client_id_b = client_id_from_u8(2);
        // Establish a connection for each client
        let (tx_for_a, mut rx_for_a) = client_connection(client_id_a, &db);
        let (tx_for_b, mut rx_for_b) = client_connection(client_id_b, &db);

        let auth_a = AuthCtx::new(db.owner_identity(), client_id_a.identity);
        let auth_b = AuthCtx::new(db.owner_identity(), client_id_b.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let u_id = db.create_table_for_test(
            "u",
            &[
                ("i", AlgebraicType::U64),
                ("a", AlgebraicType::U64),
                ("b", AlgebraicType::U64),
            ],
            &[0.into()],
        )?;
        let v_id = db.create_table_for_test(
            "v",
            &[
                ("i", AlgebraicType::U64),
                ("x", AlgebraicType::U64),
                ("y", AlgebraicType::U64),
            ],
            &[0.into(), 1.into()],
        )?;

        commit_tx(
            &db,
            &subs,
            [],
            [
                (u_id, product![0u64, 1u64, 1u64]),
                (u_id, product![1u64, 2u64, 2u64]),
                (u_id, product![2u64, 3u64, 3u64]),
                (v_id, product![0u64, 4u64, 4u64]),
                (v_id, product![1u64, 5u64, 5u64]),
            ],
        )?;

        let mut query_ids = 0;

        // Returns (i: 0, a: 1, b: 1)
        subscribe_multi(
            &subs,
            auth_a.clone(),
            &[
                "select u.* from u join v on u.i = v.i where v.x = 4",
                "select u.* from u join v on u.i = v.i where v.x = 6",
            ],
            tx_for_a,
            &mut query_ids,
        )
        .await?;

        // Returns (i: 1, a: 2, b: 2)
        subscribe_multi(
            &subs,
            auth_b.clone(),
            &[
                "select u.* from u join v on u.i = v.i where v.x = 5",
                "select u.* from u join v on u.i = v.i where v.x = 7",
            ],
            tx_for_b,
            &mut query_ids,
        )
        .await?;

        // Wait for both subscriptions
        assert!(matches!(
            rx_for_a.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        ));
        assert!(matches!(
            rx_for_b.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        ));

        // Modify a single row in `v`
        let metrics = commit_tx(
            &db,
            &subs,
            [(v_id, product![1u64, 5u64, 5u64])],
            [(v_id, product![1u64, 5u64, 6u64])],
        )?;

        // We should only have evaluated a single query
        assert_eq!(metrics.delta_queries_evaluated, 1);
        assert_eq!(metrics.delta_queries_matched, 0);

        // Insert a new row into `v`
        let metrics = commit_tx(&db, &subs, [], [(v_id, product![2u64, 6u64, 6u64])])?;

        assert_tx_update_for_table(
            rx_for_a.recv(),
            u_id,
            &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
            [product![2u64, 3u64, 3u64]],
            [],
        )
        .await;

        // We should only have evaluated a single query
        assert_eq!(metrics.delta_queries_evaluated, 1);
        assert_eq!(metrics.delta_queries_matched, 1);

        // Modify a matching row in `u`
        let metrics = commit_tx(
            &db,
            &subs,
            [(u_id, product![1u64, 2u64, 2u64])],
            [(u_id, product![1u64, 2u64, 3u64])],
        )?;

        assert_tx_update_for_table(
            rx_for_b.recv(),
            u_id,
            &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
            [product![1u64, 2u64, 3u64]],
            [product![1u64, 2u64, 2u64]],
        )
        .await;

        // We should have evaluated all of the queries
        assert_eq!(metrics.delta_queries_evaluated, 4);
        assert_eq!(metrics.delta_queries_matched, 1);

        // Insert a non-matching row in `u`
        let metrics = commit_tx(&db, &subs, [], [(u_id, product![3u64, 0u64, 0u64])])?;

        // We should have evaluated all of the queries
        assert_eq!(metrics.delta_queries_evaluated, 4);
        assert_eq!(metrics.delta_queries_matched, 0);

        Ok(())
    }

    /// Test that we do not evaluate queries that we know will not match row updates
    #[tokio::test]
    async fn test_join_pruning() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id = client_id_from_u8(1);
        let (tx, mut rx) = client_connection(client_id, &db);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let u_id = db.create_table_for_test_with_the_works(
            "u",
            &[
                ("i", AlgebraicType::U64),
                ("a", AlgebraicType::U64),
                ("b", AlgebraicType::U64),
            ],
            &[0.into()],
            // The join column for this table does not have to be unique,
            // because pruning only requires us to probe the join index on `v`.
            &[],
            StAccess::Public,
        )?;
        let v_id = db.create_table_for_test_with_the_works(
            "v",
            &[
                ("i", AlgebraicType::U64),
                ("x", AlgebraicType::U64),
                ("y", AlgebraicType::U64),
            ],
            &[0.into(), 1.into()],
            &[0.into()],
            StAccess::Public,
        )?;

        let schema = ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]);

        commit_tx(
            &db,
            &subs,
            [],
            [
                (v_id, product![1u64, 1u64, 1u64]),
                (v_id, product![2u64, 2u64, 2u64]),
                (v_id, product![3u64, 3u64, 3u64]),
                (v_id, product![4u64, 4u64, 4u64]),
                (v_id, product![5u64, 5u64, 5u64]),
            ],
        )?;

        let mut query_ids = 0;

        subscribe_multi(
            &subs,
            auth,
            &[
                "select u.* from u join v on u.i = v.i where v.x = 1",
                "select u.* from u join v on u.i = v.i where v.x = 2",
                "select u.* from u join v on u.i = v.i where v.x = 3",
                "select u.* from u join v on u.i = v.i where v.x = 4",
                "select u.* from u join v on u.i = v.i where v.x = 5",
            ],
            tx,
            &mut query_ids,
        )
        .await?;

        assert_matches!(
            rx.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        );

        // Insert a new row into `u` that joins with `x = 1`
        let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 2u64, 3u64])])?;

        assert_tx_update_for_table(rx.recv(), u_id, &schema, [product![1u64, 2u64, 3u64]], []).await;

        // We should only have evaluated a single query
        assert_eq!(metrics.delta_queries_evaluated, 1);
        assert_eq!(metrics.delta_queries_matched, 1);

        // UPDATE v SET y = 2 WHERE id = 1
        let metrics = commit_tx(
            &db,
            &subs,
            [(v_id, product![1u64, 1u64, 1u64])],
            [(v_id, product![1u64, 1u64, 2u64])],
        )?;

        // We should only have evaluated a single query
        assert_eq!(metrics.delta_queries_evaluated, 1);
        assert_eq!(metrics.delta_queries_matched, 0);

        // UPDATE v SET x = 2 WHERE id = 1
        let metrics = commit_tx(
            &db,
            &subs,
            [(v_id, product![1u64, 1u64, 2u64])],
            [(v_id, product![1u64, 2u64, 2u64])],
        )?;

        // Results in a no-op
        assert_tx_update_for_table(rx.recv(), u_id, &schema, [], []).await;

        // We should have evaluated queries for `x = 1` and `x = 2`
        assert_eq!(metrics.delta_queries_evaluated, 2);
        assert_eq!(metrics.delta_queries_matched, 2);

        // Insert new row into `u` that joins with `x = 3`
        // UPDATE v SET x = 4 WHERE id = 3
        let metrics = commit_tx(
            &db,
            &subs,
            [(v_id, product![3u64, 3u64, 3u64])],
            [(v_id, product![3u64, 4u64, 3u64]), (u_id, product![3u64, 4u64, 5u64])],
        )?;

        assert_tx_update_for_table(rx.recv(), u_id, &schema, [product![3u64, 4u64, 5u64]], []).await;

        // We should have evaluated queries for `x = 3` and `x = 4`
        assert_eq!(metrics.delta_queries_evaluated, 2);
        assert_eq!(metrics.delta_queries_matched, 1);

        // UPDATE v SET x = 0 WHERE id = 3
        let metrics = commit_tx(
            &db,
            &subs,
            [(v_id, product![3u64, 4u64, 3u64])],
            [(v_id, product![3u64, 0u64, 3u64])],
        )?;

        assert_tx_update_for_table(rx.recv(), u_id, &schema, [], [product![3u64, 4u64, 5u64]]).await;

        // We should only have evaluated the query for `x = 4`
        assert_eq!(metrics.delta_queries_evaluated, 1);
        assert_eq!(metrics.delta_queries_matched, 1);

        // Insert new row into `u` that joins with `x = 5`
        // UPDATE v SET x = 6 WHERE id = 5
        // Should result in a no-op
        let metrics = commit_tx(
            &db,
            &subs,
            [(v_id, product![5u64, 5u64, 5u64])],
            [(v_id, product![5u64, 6u64, 6u64]), (u_id, product![5u64, 6u64, 7u64])],
        )?;

        // We should only have evaluated the query for `x = 5`
        assert_eq!(metrics.delta_queries_evaluated, 1);
        assert_eq!(metrics.delta_queries_matched, 0);

        Ok(())
    }

    /// Test that one client subscribing does not affect another
    #[tokio::test]
    async fn test_subscribe_distinct_queries_same_plan() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id_a = client_id_from_u8(1);
        let client_id_b = client_id_from_u8(2);
        // Establish a connection for each client
        let (tx_for_a, mut rx_for_a) = client_connection(client_id_a, &db);
        let (tx_for_b, mut rx_for_b) = client_connection(client_id_b, &db);

        let auth_a = AuthCtx::new(db.owner_identity(), client_id_a.identity);
        let auth_b = AuthCtx::new(db.owner_identity(), client_id_b.identity);

        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let u_id = db.create_table_for_test_with_the_works(
            "u",
            &[
                ("i", AlgebraicType::U64),
                ("a", AlgebraicType::U64),
                ("b", AlgebraicType::U64),
            ],
            &[0.into()],
            // The join column for this table does not have to be unique,
            // because pruning only requires us to probe the join index on `v`.
            &[],
            StAccess::Public,
        )?;
        let v_id = db.create_table_for_test_with_the_works(
            "v",
            &[
                ("i", AlgebraicType::U64),
                ("x", AlgebraicType::U64),
                ("y", AlgebraicType::U64),
            ],
            &[0.into(), 1.into()],
            &[0.into()],
            StAccess::Public,
        )?;

        commit_tx(&db, &subs, [], [(v_id, product![1u64, 1u64, 1u64])])?;

        let mut query_ids = 0;

        // Both clients subscribe to the same query modulo whitespace
        subscribe_multi(
            &subs,
            auth_a,
            &["select u.* from u join v on u.i = v.i where v.x = 1"],
            tx_for_a,
            &mut query_ids,
        )
        .await?;
        subscribe_multi(
            &subs,
            auth_b,
            &["select u.* from u join v on u.i = v.i where v.x =  1"],
            tx_for_b.clone(),
            &mut query_ids,
        )
        .await?;

        // Wait for both subscriptions
        assert_matches!(
            rx_for_a.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        );
        assert_matches!(
            rx_for_b.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        );

        // Insert a new row into `u`
        commit_tx(&db, &subs, [], [(u_id, product![1u64, 0u64, 0u64])])?;

        assert_tx_update_for_table(
            rx_for_a.recv(),
            u_id,
            &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
            [product![1u64, 0u64, 0u64]],
            [],
        )
        .await;

        assert_tx_update_for_table(
            rx_for_b.recv(),
            u_id,
            &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
            [product![1u64, 0u64, 0u64]],
            [],
        )
        .await;

        Ok(())
    }

    /// Test that one client unsubscribing does not affect another
    #[tokio::test]
    async fn test_unsubscribe_distinct_queries_same_plan() -> anyhow::Result<()> {
        let db = relational_db()?;

        let client_id_a = client_id_from_u8(1);
        let client_id_b = client_id_from_u8(2);

        // Establish a connection for each client
        let (tx_for_a, mut rx_for_a) = client_connection(client_id_a, &db);
        let (tx_for_b, mut rx_for_b) = client_connection(client_id_b, &db);

        let auth_a = AuthCtx::new(db.owner_identity(), client_id_a.identity);
        let auth_b = AuthCtx::new(db.owner_identity(), client_id_b.identity);

        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let u_id = db.create_table_for_test_with_the_works(
            "u",
            &[
                ("i", AlgebraicType::U64),
                ("a", AlgebraicType::U64),
                ("b", AlgebraicType::U64),
            ],
            &[0.into()],
            // The join column for this table does not have to be unique,
            // because pruning only requires us to probe the join index on `v`.
            &[],
            StAccess::Public,
        )?;
        let v_id = db.create_table_for_test_with_the_works(
            "v",
            &[
                ("i", AlgebraicType::U64),
                ("x", AlgebraicType::U64),
                ("y", AlgebraicType::U64),
            ],
            &[0.into(), 1.into()],
            &[0.into()],
            StAccess::Public,
        )?;

        commit_tx(&db, &subs, [], [(v_id, product![1u64, 1u64, 1u64])])?;

        let mut query_ids = 0;

        subscribe_multi(
            &subs,
            auth_a,
            &["select u.* from u join v on u.i = v.i where v.x = 1"],
            tx_for_a,
            &mut query_ids,
        )
        .await?;
        subscribe_multi(
            &subs,
            auth_b.clone(),
            &["select u.* from u join v on u.i = v.i where  v.x = 1"],
            tx_for_b.clone(),
            &mut query_ids,
        )
        .await?;

        // Wait for both subscriptions
        assert_matches!(
            rx_for_a.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        );
        assert_matches!(
            rx_for_b.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        );

        unsubscribe_multi(&subs, auth_b, tx_for_b, query_ids)?;

        assert_matches!(
            rx_for_b.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::UnsubscribeMulti(_),
                ..
            }))
        );

        // Insert a new row into `u`
        let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 0u64, 0u64])])?;

        assert_tx_update_for_table(
            rx_for_a.recv(),
            u_id,
            &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
            [product![1u64, 0u64, 0u64]],
            [],
        )
        .await;

        // We should only have evaluated a single query
        assert_eq!(metrics.delta_queries_evaluated, 1);
        assert_eq!(metrics.delta_queries_matched, 1);

        // Modify a matching row in `v`
        let metrics = commit_tx(
            &db,
            &subs,
            [(v_id, product![1u64, 1u64, 1u64])],
            [(v_id, product![1u64, 2u64, 2u64])],
        )?;

        // We should only have evaluated a single query
        assert_eq!(metrics.delta_queries_evaluated, 1);
        assert_eq!(metrics.delta_queries_matched, 1);

        Ok(())
    }

    /// Test that we do not evaluate queries that return trivially empty results
    ///
    /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.
    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn test_query_pruning_for_empty_tables() -> anyhow::Result<()> {
        let db = relational_db()?;

        // Establish a client connection
        let client_id = client_id_from_u8(1);
        let (tx, mut rx) = client_connection(client_id, &db);

        let auth = AuthCtx::new(db.owner_identity(), client_id.identity);
        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

        let schema = &[("id", AlgebraicType::U64), ("a", AlgebraicType::U64)];
        let indices = &[0.into()];
        // Create tables `t` and `s` with `(i: u64, a: u64)`.
        db.create_table_for_test("t", schema, indices)?;
        let s_id = db.create_table_for_test("s", schema, indices)?;

        // Insert one row into `s`, but leave `t` empty.
        commit_tx(&db, &subs, [], [(s_id, product![0u64, 0u64])])?;

        // Subscribe to queries that return empty results
        let metrics = subscribe_multi(
            &subs,
            auth,
            &[
                "select t.* from t where a = 0",
                "select t.* from t join s on t.id = s.id where s.a = 0",
                "select s.* from t join s on t.id = s.id where t.a = 0",
            ],
            tx,
            &mut 0,
        )
        .await?;

        assert_matches!(
            rx.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        );

        assert_eq!(metrics.rows_scanned, 0);
        assert_eq!(metrics.index_seeks, 0);

        Ok(())
    }

    /// Asserts that a subscription holds a tx handle for the entire length of its evaluation.
    #[test]
    fn test_tx_subscription_ordering() -> ResultTest<()> {
        let test_db = TestDB::durable()?;

        let runtime = test_db.runtime().cloned().unwrap();
        let db = Arc::new(test_db.db.clone());

        // Create table with one row
        let table_id = db.create_table_for_test("T", &[("a", AlgebraicType::U8)], &[])?;
        with_auto_commit(&db, |tx| insert(&db, tx, table_id, &product!(1_u8)).map(drop))?;

        let (send, mut recv) = mpsc::unbounded_channel();

        // Subscribing to T should return a single row.
        let db2 = db.clone();
        let query_handle = runtime.spawn_blocking(move || {
            add_subscriber(
                db.clone(),
                "select * from T",
                Some(Arc::new(move |tx: &_| {
                    // Wake up writer thread after starting the reader tx
                    let _ = send.send(());
                    // Then go to sleep
                    std::thread::sleep(Duration::from_secs(1));
                    // Assuming subscription evaluation holds a lock on the db,
                    // any mutations to T will necessarily occur after,
                    // and therefore we should only see a single row returned.
                    assert_eq!(1, db.iter(tx, table_id).unwrap().count());
                })),
            )
        });

        // Write a second row to T concurrently with the reader thread
        let write_handle = runtime.spawn(async move {
            let _ = recv.recv().await;
            with_auto_commit(&db2, |tx| insert(&db2, tx, table_id, &product!(2_u8)).map(drop))
        });

        runtime.block_on(write_handle)??;
        runtime.block_on(query_handle)??;

        test_db.close()?;

        Ok(())
    }

    #[test]
    fn subs_cannot_access_private_tables() -> ResultTest<()> {
        let test_db = TestDB::durable()?;
        let db = Arc::new(test_db.db.clone());

        // Create a public table.
        let indexes = &[0.into()];
        let cols = &[("a", AlgebraicType::U8)];
        let _ = db.create_table_for_test("public", cols, indexes)?;

        // Create a private table.
        let _ = db.create_table_for_test_with_access("private", cols, indexes, StAccess::Private)?;

        // We can subscribe to a public table.
        let subscribe = |sql| add_subscriber(db.clone(), sql, None);
        assert!(subscribe("SELECT * FROM public").is_ok());

        // We cannot subscribe when a private table is mentioned,
        // not even when in a join where the projection doesn't mention the table,
        // as the mere fact of joining can leak information from the private table.
        for sql in [
            "SELECT * FROM private",
            // Even if the query will return no rows, we still reject it.
            "SELECT * FROM private WHERE false",
            "SELECT private.* FROM private",
            "SELECT public.* FROM public JOIN private ON public.a = private.a WHERE private.a = 1",
            "SELECT private.* FROM private JOIN public ON private.a = public.a WHERE public.a = 1",
        ] {
            assert!(subscribe(sql).is_err(),);
        }

        Ok(())
    }

    #[tokio::test]
    async fn test_confirmed_reads() -> anyhow::Result<()> {
        let (db, durability) = relational_db_with_manual_durability()?;

        let client_id_confirmed = client_id_from_u8(1);
        let client_id_unconfirmed = client_id_from_u8(2);

        let (tx_for_confirmed, mut rx_for_confirmed) =
            client_connection_with_confirmed_reads(client_id_confirmed, &db, true);
        let (tx_for_unconfirmed, mut rx_for_unconfirmed) =
            client_connection_with_confirmed_reads(client_id_unconfirmed, &db, false);

        let auth_confirmed = AuthCtx::new(db.owner_identity(), client_id_confirmed.identity);
        let auth_unconfirmed = AuthCtx::new(db.owner_identity(), client_id_unconfirmed.identity);

        let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
        let table = db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;
        let schema = ProductType::from([AlgebraicType::U8]);

        // Subscribe both clients.
        subscribe_multi(&subs, auth_confirmed, &["select * from t"], tx_for_confirmed, &mut 0).await?;
        subscribe_multi(
            &subs,
            auth_unconfirmed,
            &["select * from t"],
            tx_for_unconfirmed,
            &mut 0,
        )
        .await?;

        assert_matches!(
            rx_for_unconfirmed.recv().await,
            Some(SerializableMessage::Subscription(SubscriptionMessage {
                result: SubscriptionResult::SubscribeMulti(_),
                ..
            }))
        );
        assert_after_durable(&durability, async {
            assert_matches!(
                rx_for_confirmed.recv().await,
                Some(SerializableMessage::Subscription(SubscriptionMessage {
                    result: SubscriptionResult::SubscribeMulti(_),
                    ..
                }))
            );
        })
        .await;

        // Insert a row.
        let mut tx = begin_mut_tx(&db);
        db.insert(&mut tx, table, &bsatn::to_vec(&product![1_u8])?)?;
        assert!(matches!(
            subs.commit_and_broadcast_event(None, module_event(), tx),
            Ok(Ok(_))
        ));
        // Insert another row, using SQL.
        let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0));
        run(
            &db,
            "INSERT INTO t (x) VALUES (2)",
            auth,
            Some(&subs),
            None,
            &mut vec![],
        )
        .await?;

        // Unconfirmed client should have received both rows.
        assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![1_u8]], []).await;
        assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![2_u8]], []).await;

        // Confirmed client should receive the rows after the tx becomes durable.
        assert_after_durable(&durability, async {
            assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![1_u8]], []).await;
            assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![2_u8]], []).await
        })
        .await;

        Ok(())
    }
}
